22 Web 端指令下发控制 ROS2

Web 端指令下发控制 ROS2

关联:索引

要解决的问题

章节内容(本讲核心):

与前置知识衔接(避免重复):

项目工坊:实现网页按钮下发控制指令到 ROS2

学生任务:

大模型任务:

作业(布置):

1)实现 Web 与 ROS2 双向通信(订阅 + 发布)
2)截图数据接收与指令下发效果
3)提供关键代码说明


本讲配套前端项目:09_rosbridge_web_control_panel/(Vue3 + Vite + TypeScript)。

推荐项目结构(与本讲代码一一对应):

09_rosbridge_web_control_panel/
  src/
    App.vue
    components/
      RosbridgeCommandMinimal.vue
      SortingArmControlPanel.vue
    utils/
      sorting-arm-protocol.ts
      sorting-arm-web-control-client.ts

创建与运行(在你的工作目录执行):

npm.cmd create vite@latest 09_rosbridge_web_control_panel -- --template vue-ts --no-interactive
cd 09_rosbridge_web_control_panel
npm.cmd install
npm.cmd run dev

本讲统一用 std_msgs/msg/String 携带 JSON 字符串,原因:

话题约定

说明:

指令消息(String 内部 JSON)推荐结构:ArmCommand(与《人工智能综合实践》15 一致)

{
  "cmd_id": "C-20260412-0001",
  "scene": "sorting",
  "device_type": "arm",
  "device_id": "arm_01",
  "action": "stop",
  "params": { "reason": "user_click" },
  "safety": { "require_enable": true, "require_guard_closed": true },
  "meta": { "user": "stu01", "role": "operator" },
  "ts_ms": 1710000000000
}

回传消息(String 内部 JSON)推荐结构:ArmStatus(与《人工智能综合实践》15 一致)

{
  "device_type": "arm",
  "device_id": "arm_01",
  "state": "idle",
  "last_cmd_id": "C-20260412-0001",
  "ok": true,
  "code": "OK",
  "message": "stop executed",
  "detail": { "progress": 1.0 },
  "ts_ms": 1710000001234
}

  1. WebSocket 已连接 rosbridge(状态为 OPEN)
  2. 点击按钮后,Web 端发出 op=publish 的 JSON
  3. ROS2 侧能看到 /sorting_arm/cmd 收到的 std_msgs/msg/String
  4. ROS2 侧发布 /sorting_arm/status,Web 端能收到并在页面显示结果

1) rosbridge publish 最小结构(协议层)

{
  "op": "publish",
  "topic": "/sorting_arm/cmd",
  "msg": {
    "data": "{\"cmd_id\":\"C-20260412-0001\",\"scene\":\"sorting\",\"device_type\":\"arm\",\"device_id\":\"arm_01\",\"action\":\"stop\",\"params\":{\"reason\":\"user_click\"},\"safety\":{\"require_enable\":true,\"require_guard_closed\":true},\"meta\":{\"user\":\"stu01\",\"role\":\"operator\"},\"ts_ms\":1710000000000}"
  }
}

2) 为什么建议“String 携带 JSON”?

说明:

1) 控制面板组件:按钮下发 ArmAction,并监听 status(项目版)

本讲项目中:

src/App.vue(最小接线):

<template>
  <RosbridgeCommandMinimal />
</template>

<script setup lang="ts">
import RosbridgeCommandMinimal from './components/RosbridgeCommandMinimal.vue'
</script>

src/components/RosbridgeCommandMinimal.vue(同名落位组件):

<template>
  <SortingArmControlPanel />
</template>

<script setup lang="ts">
import SortingArmControlPanel from './SortingArmControlPanel.vue'
</script>

src/components/SortingArmControlPanel.vue(项目核心实现:连接 + 权限 + 下发 + 回传匹配):

<template>
  <div class="wrap">
    <h2>Web → ROS2 指令下发(/sorting_arm/cmd)</h2>

    <div class="row">
      <label class="label" for="url">WebSocket URL</label>
      <input id="url" v-model="url" class="input" type="text" />
    </div>

    <div class="row">
      <label class="label" for="deviceId">device_id</label>
      <input id="deviceId" v-model="deviceId" class="input" type="text" />
    </div>

    <div class="row">
      <div class="status">连接状态:{{ connStatus }}</div>
      <button class="btn" type="button" :disabled="connStatus === 'OPEN' || connStatus === 'CONNECTING'" @click="connect">
        连接
      </button>
      <button class="btn" type="button" :disabled="connStatus !== 'OPEN'" @click="disconnect">
        断开
      </button>
    </div>

    <div class="row">
      <div class="status">当前角色:{{ role }}</div>
      <select v-model="role" class="select">
        <option value="viewer">viewer(只读)</option>
        <option value="operator">operator(可控)</option>
        <option value="admin">admin(高权限)</option>
      </select>
    </div>

    <div class="row">
      <button class="btn primary" type="button" :disabled="!canSend('home') || sending" @click="send('home')">
        {{ sending && currentAction === 'home' ? '下发中...' : '回零(home)' }}
      </button>
      <button class="btn primary" type="button" :disabled="!canSend('stop') || sending" @click="send('stop')">
        {{ sending && currentAction === 'stop' ? '下发中...' : '停止(stop)' }}
      </button>
      <button class="btn danger" type="button" :disabled="!canSend('e_stop') || sending" @click="send('e_stop')">
        {{ sending && currentAction === 'e_stop' ? '下发中...' : '急停(e_stop)' }}
      </button>
    </div>

    <div class="row">
      <div class="status">最近一次 cmd_id:{{ lastCmdId || '(暂无)' }}</div>
    </div>

    <div class="row">
      <div class="status">最近一次结果:{{ lastResultText || '(暂无)' }}</div>
    </div>

    <div v-if="lastRawStatus" class="row">
      <details>
        <summary>最近一条 /sorting_arm/status 原始 JSON</summary>
        <pre class="pre">{{ lastRawStatus }}</pre>
      </details>
    </div>

    <div v-if="lastError" class="row error">错误:{{ lastError }}</div>
  </div>
</template>

<script setup lang="ts">
import { onUnmounted, ref } from 'vue'
import type { ArmAction, ArmStatus, Role } from '../utils/sorting-arm-protocol'
import { SortingArmWebControlClient } from '../utils/sorting-arm-web-control-client'

type ConnStatus = 'IDLE' | 'CONNECTING' | 'OPEN' | 'CLOSING' | 'CLOSED'

const url = ref('ws://localhost:19090')
const deviceId = ref('arm_01')

const connStatus = ref<ConnStatus>('IDLE')
const role = ref<Role>('operator')

const lastCmdId = ref('')
const lastResultText = ref('')
const lastRawStatus = ref('')
const lastError = ref('')

const sending = ref(false)
const currentAction = ref<ArmAction | null>(null)

const client = new SortingArmWebControlClient()

const actionPermission: Record<Role, ArmAction[]> = {
  viewer: [],
  operator: ['home', 'stop'],
  admin: ['home', 'stop', 'e_stop', 'pick_place']
}

function canSend(action: ArmAction): boolean {
  return connStatus.value === 'OPEN' && actionPermission[role.value].includes(action)
}

function setConnStatusFromReadyState(sock: WebSocket): void {
  connStatus.value =
    sock.readyState === WebSocket.CONNECTING
      ? 'CONNECTING'
      : sock.readyState === WebSocket.OPEN
      ? 'OPEN'
      : sock.readyState === WebSocket.CLOSING
      ? 'CLOSING'
      : 'CLOSED'
}

function connect(): void {
  lastError.value = ''
  lastResultText.value = ''
  lastRawStatus.value = ''

  client.connect(url.value)
  const ws = client.getSocket()
  if (!ws) return
  setConnStatusFromReadyState(ws)
  ws.addEventListener('open', () => setConnStatusFromReadyState(ws))
  ws.addEventListener('close', () => setConnStatusFromReadyState(ws))
  ws.addEventListener('error', () => {
    lastError.value = 'WebSocket error'
  })
}

function disconnect(): void {
  try {
    connStatus.value = 'CLOSING'
    client.disconnect()
  } finally {
    connStatus.value = 'CLOSED'
  }
}

function formatStatusText(status: ArmStatus): string {
  const okText = status.ok ? '成功' : '失败'
  const codeText = status.code ? `(${status.code})` : ''
  const msgText = status.message || ''
  const stateText = status.state ? ` state=${status.state}` : ''
  return `${okText}${codeText} ${msgText}${stateText}`.trim()
}

async function send(action: ArmAction): Promise<void> {
  if (!canSend(action)) return
  lastError.value = ''
  lastResultText.value = ''
  sending.value = true
  currentAction.value = action

  const params =
    action === 'home'
      ? {}
      : action === 'stop'
      ? { reason: 'user_click' }
      : action === 'e_stop'
      ? { reason: 'user_click' }
      : {}

  try {
    const { cmdId, status } = await client.sendCommand({
      user: 'stu01',
      role: role.value,
      deviceId: deviceId.value,
      action,
      params,
      timeoutMs: 3000
    })
    lastCmdId.value = cmdId
    lastResultText.value = formatStatusText(status)
    lastRawStatus.value = JSON.stringify(status, null, 2)
  } catch (e) {
    lastError.value = e instanceof Error ? e.message : 'send failed'
  } finally {
    sending.value = false
    currentAction.value = null
  }
}

onUnmounted(() => {
  disconnect()
})
</script>

代码自检要点(对应要求:代码/命令必须解释):

2) ROS2 侧自测:如何验证 Web 端确实发出了指令?

在 ROS2 侧终端执行:

ros2 topic echo /sorting_arm/cmd

优先推荐:直接复用《人工智能综合实践》15 提供的 ROS2 Python 功能包 sorting_arm_mock(可直接 ros2 run)。如果你不方便构建 ROS2 包,再用下面的“单文件 demo”临时跑通回传。

按《人工智能综合实践》15 的统一口径,在 ROS2 环境终端执行:

cd 09_rosbridge_control_tool/ros2_ws
colcon build --symlink-install
source install/setup.bash
ros2 run sorting_arm_mock arm_mock
import json
import time

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class SortingArmStatusDemo(Node):
    def __init__(self) -> None:
        super().__init__("sorting_arm_status_demo")
        self.sub = self.create_subscription(String, "/sorting_arm/cmd", self.on_cmd, 10)
        self.pub = self.create_publisher(String, "/sorting_arm/status", 10)

    def on_cmd(self, msg: String) -> None:
        try:
            cmd = json.loads(msg.data)
        except Exception:
            return

        cmd_id = cmd.get("cmd_id")
        if not isinstance(cmd_id, str):
            return

        status = {
            "device_type": "arm",
            "device_id": cmd.get("device_id", "arm_01"),
            "state": "idle",
            "last_cmd_id": cmd_id,
            "ok": True,
            "code": "OK",
            "message": "mock status",
            "detail": {"received_action": cmd.get("action")},
            "ts_ms": int(time.time() * 1000),
        }

        out = String()
        out.data = json.dumps(status, ensure_ascii=False)
        self.pub.publish(out)

def main() -> None:
    rclpy.init()
    node = SortingArmStatusDemo()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == "__main__":
    main()

运行方式(示例):

python3 sorting_arm_status_demo.py

一、工程化目标:把“指令发送”从组件里抽出去

目标(组件只做 UI):

二、统一类型:TypeScript 定义(可复制粘贴)

建议放到 src/utils/sorting-arm-protocol.ts(相对路径)。

export type Role = 'viewer' | 'operator' | 'admin'

export type ArmAction = 'home' | 'stop' | 'e_stop' | 'pick_place'

export type ArmCommand<TParams = Record<string, unknown>> = {
  cmd_id: string
  scene: 'sorting'
  device_type: 'arm'
  device_id: string
  action: ArmAction
  params: TParams
  safety: { require_enable: boolean; require_guard_closed: boolean }
  meta?: { user: string; role: Role }
  ts_ms: number
}

export type ArmStatus = {
  device_type: 'arm'
  device_id: string
  state: 'idle' | 'running' | 'error' | 'estop'
  last_cmd_id: string
  ok: boolean
  code: 'OK' | 'DENY' | 'BAD_REQUEST' | 'EXEC_ERROR'
  message: string
  detail?: Record<string, unknown>
  ts_ms: number
}

三、权限控制:前端 UI 限制 + ROS2 侧二次校验(最小模型)

1) 前端权限(按钮是否可点)

import type { ArmAction, Role } from './sorting-arm-protocol'

const actionPermission: Record<Role, ArmAction[]> = {
  viewer: [],
  operator: ['home', 'stop'],
  admin: ['home', 'stop', 'e_stop', 'pick_place']
}

export function canSendAction(role: Role, action: ArmAction): boolean {
  return actionPermission[role].includes(action)
}

2) ROS2 执行侧权限(必须二次校验)

四、请求-回传封装:sendCommand(核心)

下面给出一个“只依赖浏览器 WebSocket”的封装示例(与配套项目 09_rosbridge_web_control_panel/src/utils/sorting-arm-web-control-client.ts 对齐)。

import type { ArmCommand, ArmStatus, Role } from './sorting-arm-protocol'

type Pending = {
  resolve: (status: ArmStatus) => void
  reject: (err: Error) => void
  timer: number
}

type RosbridgePublish = { op: 'publish'; topic: string; msg: { data: string } }
type RosbridgeSubscribe = { op: 'subscribe'; topic: string; type: 'std_msgs/msg/String'; id: string }
type RosbridgeIncoming = { op: 'publish'; topic: string; msg: { data?: unknown } }

export class SortingArmWebControlClient {
  private ws: WebSocket | null = null
  private readonly cmdTopic: string
  private readonly statusTopic: string
  private readonly pending: Map<string, Pending> = new Map()

  constructor(options: { cmdTopic?: string; statusTopic?: string } = {}) {
    this.cmdTopic = options.cmdTopic ?? '/sorting_arm/cmd'
    this.statusTopic = options.statusTopic ?? '/sorting_arm/status'
  }

  connect(url: string): void {
    if (this.ws && (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING)) return
    this.ws = new WebSocket(url)

    this.ws.addEventListener('open', () => {
      if (!this.ws) return
      const sub: RosbridgeSubscribe = { op: 'subscribe', topic: this.statusTopic, type: 'std_msgs/msg/String', id: 'sub-sorting-arm-status' }
      this.ws.send(JSON.stringify(sub))
    })

    this.ws.addEventListener('message', (ev) => {
      const raw = typeof ev.data === 'string' ? ev.data : ''
      if (!raw) return

      let data: unknown
      try {
        data = JSON.parse(raw)
      } catch {
        return
      }

      if (!data || typeof data !== 'object') return
      if (!('op' in data) || !('topic' in data) || !('msg' in data)) return

      const d = data as RosbridgeIncoming
      if (d.op !== 'publish' || d.topic !== this.statusTopic) return

      const msgData = d.msg?.data
      if (typeof msgData !== 'string') return

      let status: unknown
      try {
        status = JSON.parse(msgData)
      } catch {
        return
      }
      if (!status || typeof status !== 'object') return
      if (!('last_cmd_id' in status) || !('ok' in status) || !('code' in status) || !('message' in status)) return

      const s = status as ArmStatus
      const p = this.pending.get(s.last_cmd_id)
      if (!p) return

      window.clearTimeout(p.timer)
      this.pending.delete(s.last_cmd_id)
      p.resolve(s)
    })

    this.ws.addEventListener('close', () => {
      for (const [cmdId, p] of this.pending.entries()) {
        window.clearTimeout(p.timer)
        p.reject(new Error(`connection closed, cmd_id=${cmdId}`))
      }
      this.pending.clear()
    })
  }

  getSocket(): WebSocket | null {
    return this.ws
  }

  disconnect(): void {
    if (!this.ws) return
    this.ws.close()
    this.ws = null
  }

  publishCommand(cmd: ArmCommand): void {
    const ws = this.ws
    if (!ws || ws.readyState !== WebSocket.OPEN) throw new Error('WebSocket not open')
    const payload = JSON.stringify(cmd)
    const msg: RosbridgePublish = { op: 'publish', topic: this.cmdTopic, msg: { data: payload } }
    ws.send(JSON.stringify(msg))
  }

  async sendCommand<TParams extends Record<string, unknown>>(input: {
    user: string
    role: Role
    deviceId: string
    action: ArmCommand['action']
    params: TParams
    safety?: ArmCommand['safety']
    timeoutMs?: number
  }): Promise<{ cmdId: string; status: ArmStatus }> {
    const cmdId = makeCmdId()
    const cmd: ArmCommand<TParams> = {
      cmd_id: cmdId,
      scene: 'sorting',
      device_type: 'arm',
      device_id: input.deviceId,
      action: input.action,
      params: input.params,
      safety: input.safety ?? { require_enable: true, require_guard_closed: true },
      meta: { user: input.user, role: input.role },
      ts_ms: Date.now()
    }

    this.publishCommand(cmd)

    const timeoutMs = input.timeoutMs ?? 3000
    const status = await new Promise<ArmStatus>((resolve, reject) => {
      const timer = window.setTimeout(() => {
        this.pending.delete(cmdId)
        reject(new Error(`status timeout, cmd_id=${cmdId}`))
      }, timeoutMs)

      this.pending.set(cmdId, { resolve, reject, timer })
    })

    return { cmdId, status }
  }
}

export function makeCmdId(): string {
  const ts = Date.now()
  const rand = Math.random().toString(16).slice(2, 8)
  const day = new Date(ts)
  const y = String(day.getFullYear())
  const m = String(day.getMonth() + 1).padStart(2, '0')
  const d = String(day.getDate()).padStart(2, '0')
  return `C-${y}${m}${d}-${rand}`
}

关键点解释:

你可以把下面模板直接发给 AI,让它输出“可粘贴进项目”的代码。

模板 1:生成 ArmCommand/ArmStatus 格式与类型

你是前端工程师。请为 Web → ROS2 的指令下发设计一个 JSON 消息格式(ArmCommand)和回传格式(ArmStatus)。
要求:复用“机械臂控制”骨架:cmd_id/scene/device_type/device_id/action/params/safety/ts_ms,回传包含 last_cmd_id/state/ok/code/message/detail/ts_ms,并给出 TypeScript 类型定义与 3 条示例消息。
约束:消息承载使用 std_msgs/msg/String 的 data 字段携带 JSON 字符串;action 使用字符串白名单联合类型;保证字段命名一致并能用于 cmd_id/last_cmd_id 匹配。

模板 2:生成 sendCommand 封装(带超时与回传匹配)

你是 TypeScript 工程师。请实现一个 SortingArmWebControlClient:

模板 3:生成 ROS2 Python 执行节点(含权限校验)

你是 ROS2 Python 工程师。请编写 rclpy 节点:订阅 /sorting_arm/cmd (std_msgs/msg/String),解析 JSON 指令(含 cmd_id/action/device_id/meta.role),按 role 做动作白名单权限校验;执行通过则回传 /sorting_arm/status (std_msgs/msg/String);status 必须复用 last_cmd_id=cmd_id。
输出:可运行的 Python 文件,包含错误处理(JSON 解析失败/字段缺失/权限拒绝),并给出 ros2 topic echo 的验证步骤。


课后作业(提交要求)

1)实现 Web 与 ROS2 双向通信(订阅 + 发布)
2)截图数据接收与指令下发效果(至少两张:收到数据、下发成功或失败提示)
3)提供关键代码说明(至少包含:消息格式、sendCommand/订阅 status 的核心逻辑、权限控制点)

参考与延伸